Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqs source: json codec support to split sqs message into multiple events #5330

Merged
merged 4 commits into from
Jan 15, 2025

Conversation

jmsusanto
Copy link
Contributor

Description

adds json codec support and functionality to split message into multiple events

#5054

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@jmsusanto jmsusanto changed the title json codec support and functionality to split message into multiple events sqs source: json codec support to split sqs message into multiple events Jan 14, 2025
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue());
}

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
Copy link
Member

@dlvenable dlvenable Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this code is similar to the RawSqsMessageHandler. We should not duplicate this code.

I think the best option is to update the existing RawSqsMessageHandler to support an injectable message strategy.

It might look like:

interface MessageFieldStrategy {
  List<Event> parseEvents(String messageBody);
}

This has some advantages of being able to use buffer.writeAll for the whole batch and the attribute code is shared for all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the BulkMessageHandler and added strategies instead. buffer.writeAll is also used instead of buffer.write now

for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
final String originalKey = entry.getKey();
final String lowerCamelCaseKey = originalKey.substring(0, 1).toLowerCase() + originalKey.substring(1);;
eventMetadata.setAttribute(lowerCamelCaseKey, entry.getValue().stringValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this processing can be shared, which has the added benefit of avoiding extra memory. You can do this in the refactoring that I suggest above.

That is, before the loop over the List<Event>, create a Map<String, String> for all the attributes. Then re-use those values. This should reduce both compute and memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, the metadata/attributes of a bulk message would remain the same for every event in that message

@@ -0,0 +1,73 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the longer copyright header.

https://github.com/opensearch-project/data-prepper/blob/90575b1de56f82f44d1af36f31ff4b077a627bd7/CONTRIBUTING.md#license-headers

/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 *
*/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to every file

Jeremy Michael added 2 commits January 14, 2025 14:11
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jmsusanto . This is looking good. I have a few smaller comments.

import java.util.List;
import java.util.function.Consumer;

public class JsonBulkMessageFieldStrategy implements MessageFieldStrategy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rename this to CodecBulkMessageFieldStrategy because it is not bound to JSON.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else {
sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler());
MessageFieldStrategy standardStrategy = new StandardMessageFieldStrategy();
sqsEventProcessor = new SqsEventProcessor(new RawSqsMessageHandler(standardStrategy));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line and line 89 are the same. Refactor to use the same line. Just get the MessageFieldStrategy from the conditional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, i made the change

import java.util.List;

public interface MessageFieldStrategy {
// Convert the SQS message body into one or more events.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this into a Javadoc comment.

Signed-off-by: Jeremy Michael <[email protected]>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this improvement!

@sb2k16 sb2k16 merged commit 7c3681f into opensearch-project:main Jan 15, 2025
43 of 47 checks passed
@jmsusanto jmsusanto deleted the sqs branch January 17, 2025 19:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants